Merge pull request #104 from cantino/transactionally_schedule

Add better event handling (expiration) and transactions

Andrew Cantino 11 gadi atpakaļ
vecāks
revīzija
fa492b75c8

+ 24 - 22
app/models/agent.rb

@@ -199,30 +199,32 @@ class Agent < ActiveRecord::Base
199 199
     end
200 200
 
201 201
     def receive!
202
-      sql = Agent.
203
-              select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
204
-              joins("JOIN links ON (links.receiver_id = agents.id)").
205
-              joins("JOIN agents AS sources ON (links.source_id = sources.id)").
206
-              joins("JOIN events ON (events.agent_id = sources.id)").
207
-              where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
208
-
209
-      agents_to_events = {}
210
-      Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
211
-        agents_to_events[receiver_agent_id] ||= []
212
-        agents_to_events[receiver_agent_id] << event_id
213
-      end
214
-
215
-      event_ids = agents_to_events.values.flatten.uniq.compact
216
-
217
-      Agent.where(:id => agents_to_events.keys).each do |agent|
218
-        agent.update_attribute :last_checked_event_id, event_ids.max
219
-        Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
220
-      end
221
-
222
-      {
202
+      Agent.transaction do
203
+        sql = Agent.
204
+                select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
205
+                joins("JOIN links ON (links.receiver_id = agents.id)").
206
+                joins("JOIN agents AS sources ON (links.source_id = sources.id)").
207
+                joins("JOIN events ON (events.agent_id = sources.id)").
208
+                where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
209
+
210
+        agents_to_events = {}
211
+        Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
212
+          agents_to_events[receiver_agent_id] ||= []
213
+          agents_to_events[receiver_agent_id] << event_id
214
+        end
215
+
216
+        event_ids = agents_to_events.values.flatten.uniq.compact
217
+
218
+        Agent.where(:id => agents_to_events.keys).each do |agent|
219
+          agent.update_attribute :last_checked_event_id, event_ids.max
220
+          Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
221
+        end
222
+
223
+        {
223 224
           :agent_count => agents_to_events.keys.length,
224 225
           :event_count => event_ids.length
225
-      }
226
+        }
227
+      end
226 228
     end
227 229
 
228 230
     # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then

+ 5 - 1
app/models/event.rb

@@ -1,5 +1,5 @@
1 1
 class Event < ActiveRecord::Base
2
-  attr_accessible :lat, :lng, :payload, :user_id, :user
2
+  attr_accessible :lat, :lng, :payload, :user_id, :user, :expires_at
3 3
 
4 4
   acts_as_mappable
5 5
 
@@ -21,4 +21,8 @@ class Event < ActiveRecord::Base
21 21
   def reemit!
22 22
     agent.create_event :payload => payload, :lat => lat, :lng => lng
23 23
   end
24
+
25
+  def self.cleanup_expired!
26
+    Event.where("expires_at IS NOT NULL AND expires_at < ?", Time.now).delete_all
27
+  end
24 28
 end

+ 60 - 34
bin/schedule.rb

@@ -10,57 +10,83 @@ end
10 10
 
11 11
 require 'rufus/scheduler'
12 12
 
13
-def run_schedule(time, mutex)
14
-  ActiveRecord::Base.connection_pool.with_connection do
15
-    mutex.synchronize do
13
+class HuginnScheduler
14
+  attr_accessor :mutex
15
+
16
+  def run_schedule(time)
17
+    with_mutex do
16 18
       puts "Queuing schedule for #{time}"
17 19
       Agent.delay.run_schedule(time)
18 20
     end
19 21
   end
20
-end
21 22
 
22
-def propogate!(mutex)
23
-  ActiveRecord::Base.connection_pool.with_connection do
24
-    mutex.synchronize do
23
+  def propagate!
24
+    with_mutex do
25 25
       puts "Queuing event propagation"
26 26
       Agent.delay.receive!
27 27
     end
28 28
   end
29
-end
30 29
 
31
-mutex = Mutex.new
30
+  def cleanup_expired_events!
31
+    with_mutex do
32
+      puts "Running event cleanup"
33
+      Event.delay.cleanup_expired!
34
+    end
35
+  end
32 36
 
33
-scheduler = Rufus::Scheduler.new
37
+  def with_mutex
38
+    ActiveRecord::Base.connection_pool.with_connection do
39
+      mutex.synchronize do
40
+        yield
41
+      end
42
+    end
43
+  end
34 44
 
35
-# Schedule event propagation.
45
+  def run!
46
+    self.mutex = Mutex.new
36 47
 
37
-scheduler.every '5m' do
38
-  propogate!(mutex)
39
-end
48
+    rufus_scheduler = Rufus::Scheduler.new
40 49
 
41
-# Schedule repeating events.
50
+    # Schedule event propagation.
42 51
 
43
-%w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
44
-  scheduler.every schedule do
45
-    run_schedule "every_#{schedule}", mutex
46
-  end
47
-end
52
+    rufus_scheduler.every '1m' do
53
+      propagate!
54
+    end
55
+
56
+    # Schedule event cleanup.
57
+
58
+    rufus_scheduler.cron "0 0 * * * America/Los_Angeles" do
59
+      cleanup_expired_events!
60
+    end
61
+
62
+    # Schedule repeating events.
48 63
 
49
-# Schedule events for specific times.
50
-
51
-# Times are assumed to be in PST for now.  Can store a user#timezone later.
52
-24.times do |hour|
53
-  scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
54
-    if hour == 0
55
-      run_schedule "midnight", mutex
56
-    elsif hour < 12
57
-      run_schedule "#{hour}am", mutex
58
-    elsif hour == 12
59
-      run_schedule "noon", mutex
60
-    else
61
-      run_schedule "#{hour - 12}pm", mutex
64
+    %w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
65
+      rufus_scheduler.every schedule do
66
+        run_schedule "every_#{schedule}"
67
+      end
62 68
     end
69
+
70
+    # Schedule events for specific times.
71
+
72
+    # Times are assumed to be in PST for now.  Can store a user#timezone later.
73
+    24.times do |hour|
74
+      rufus_scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
75
+        if hour == 0
76
+          run_schedule "midnight"
77
+        elsif hour < 12
78
+          run_schedule "#{hour}am"
79
+        elsif hour == 12
80
+          run_schedule "noon"
81
+        else
82
+          run_schedule "#{hour - 12}pm"
83
+        end
84
+      end
85
+    end
86
+
87
+    rufus_scheduler.join
63 88
   end
64 89
 end
65 90
 
66
-scheduler.join
91
+scheduler = HuginnScheduler.new
92
+scheduler.run!

+ 6 - 0
db/migrate/20131105063248_add_expires_at_to_events.rb

@@ -0,0 +1,6 @@
1
+class AddExpiresAtToEvents < ActiveRecord::Migration
2
+  def change
3
+    add_column :events, :expires_at, :datetime
4
+    add_index :events, :expires_at
5
+  end
6
+end

+ 3 - 1
db/schema.rb

@@ -11,7 +11,7 @@
11 11
 #
12 12
 # It's strongly recommended to check this file into your version control system.
13 13
 
14
-ActiveRecord::Schema.define(:version => 20130819160603) do
14
+ActiveRecord::Schema.define(:version => 20131105063248) do
15 15
 
16 16
   create_table "agent_logs", :force => true do |t|
17 17
     t.integer  "agent_id",                         :null => false
@@ -67,9 +67,11 @@ ActiveRecord::Schema.define(:version => 20130819160603) do
67 67
     t.text     "payload",    :limit => 16777215
68 68
     t.datetime "created_at",                                                     :null => false
69 69
     t.datetime "updated_at",                                                     :null => false
70
+    t.datetime "expires_at"
70 71
   end
71 72
 
72 73
   add_index "events", ["agent_id", "created_at"], :name => "index_events_on_agent_id_and_created_at"
74
+  add_index "events", ["expires_at"], :name => "index_events_on_expires_at"
73 75
   add_index "events", ["user_id", "created_at"], :name => "index_events_on_user_id_and_created_at"
74 76
 
75 77
   create_table "links", :force => true do |t|

+ 1 - 1
spec/models/agent_spec.rb

@@ -186,7 +186,7 @@ describe Agent do
186 186
         }
187 187
         Agent.async_check(agents(:bob_weather_agent).id)
188 188
         lambda {
189
-          Agent.receive!
189
+          Agent.async_receive(agents(:bob_rain_notifier_agent).id, [agents(:bob_weather_agent).events.last.id])
190 190
         }.should raise_error
191 191
         log = agents(:bob_rain_notifier_agent).logs.first
192 192
         log.message.should =~ /Exception/

+ 34 - 0
spec/models/event_spec.rb

@@ -16,4 +16,38 @@ describe Event do
16 16
       Event.last.created_at.should be_within(1).of(Time.now)
17 17
     end
18 18
   end
19
+
20
+  describe ".cleanup_expired!" do
21
+    it "removes any Events whose expired_at date is non-null and in the past" do
22
+      event = agents(:jane_weather_agent).create_event :expires_at => 2.hours.from_now
23
+
24
+      current_time = Time.now
25
+      stub(Time).now { current_time }
26
+
27
+      Event.cleanup_expired!
28
+      Event.find_by_id(event.id).should_not be_nil
29
+      current_time = 119.minutes.from_now
30
+      Event.cleanup_expired!
31
+      Event.find_by_id(event.id).should_not be_nil
32
+      current_time = 2.minutes.from_now
33
+      Event.cleanup_expired!
34
+      Event.find_by_id(event.id).should be_nil
35
+    end
36
+
37
+    it "doesn't touch Events with no expired_at" do
38
+      event = Event.new
39
+      event.agent = agents(:jane_weather_agent)
40
+      event.expires_at = nil
41
+      event.save!
42
+
43
+      current_time = Time.now
44
+      stub(Time).now { current_time }
45
+
46
+      Event.cleanup_expired!
47
+      Event.find_by_id(event.id).should_not be_nil
48
+      current_time = 2.days.from_now
49
+      Event.cleanup_expired!
50
+      Event.find_by_id(event.id).should_not be_nil
51
+    end
52
+  end
19 53
 end